{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2019 Google Inc. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#            http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Analyzing structured data with Tensorflow Data Validation\n",
    "\n",
    "This notebook demonstrates how [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) (TFDV) can be used to analyze and validate structured data, including generating descriptive statistics, inferring and fine tuning schema, checking for and fixing anomalies, and detecting drift and skew. It's important to understand your dataset's characteristics, including how it might change over time in your production pipeline. It's also important to look for anomalies in your data, and to compare your training, evaluation, and serving datasets to make sure that they're consistent. TFDV is the tool to achieve it.\n",
    "\n",
    "You are going to use a variant of Cover Type dataset. For more information about the dataset refer to [the dataset summary page.](../datasets/covertype/README.md)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Lab setup\n",
    "\n",
    "Make sure to set the Jupyter kernel for this notebook to `tfx`.\n",
    "\n",
    "### Import packages and check the versions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import tempfile\n",
    "import tensorflow as tf\n",
    "import tensorflow_data_validation as tfdv\n",
    "import time\n",
    "\n",
    "from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, DebugOptions, WorkerOptions\n",
    "from google.protobuf import text_format\n",
    "from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2\n",
    "\n",
    "print('TensorFlow version: {}'.format(tf.__version__))\n",
    "print('TensorFlow Data Validation version: {}'.format(tfdv.__version__))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set the GCS locations of datasets used during the lab"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TRAINING_DATASET='gs://workshop-datasets/covertype/training/dataset.csv'\n",
    "TRAINING_DATASET_WITH_MISSING_VALUES='gs://workshop-datasets/covertype/training_missing/dataset.csv'\n",
    "EVALUATION_DATASET='gs://workshop-datasets/covertype/evaluation/dataset.csv'\n",
    "EVALUATION_DATASET_WITH_ANOMALIES='gs://workshop-datasets/covertype/evaluation_anomalies/dataset.csv'\n",
    "SERVING_DATASET='gs://workshop-datasets/covertype/serving/dataset.csv'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set the local path to the lab's folder."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "LAB_ROOT_FOLDER='/home/mlops-labs/lab-31-tfdv-structured-data'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Configure GCP project, region, and staging bucket"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PROJECT_ID = 'mlops-workshop'\n",
    "REGION = 'us-central1'\n",
    "STAGING_BUCKET = 'gs://{}-staging'.format(PROJECT_ID)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Computing and visualizing descriptive statistics\n",
    "\n",
    " \n",
    "TFDV can compute descriptive statistics that provide a quick overview of the data in terms of the features that are present and the shapes of their value distributions.\n",
    "\n",
    "Internally, TFDV uses Apache Beam's data-parallel processing framework to scale the computation of statistics over large datasets. For applications that wish to integrate deeper with TFDV (e.g., attach statistics generation at the end of a data-generation pipeline), the API also exposes a Beam PTransform for statistics generation.\n",
    "\n",
    "Let's start by using `tfdv.generate_statistics_from_csv` to compute statistics for the training data split.\n",
    "\n",
    "Notice that although your dataset is in Google Cloud Storage you will run you computation locally on the notebook's host, using the Beam DirectRunner. Later in the lab, you will use Cloud Dataflow to calculate statistics on a remote distributed cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=TRAINING_DATASET_WITH_MISSING_VALUES\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can now use `tfdv.visualize_statistics` to create a visualization of your data. `tfdv.visualize_statistics` uses [Facets](https://pair-code.github.io/facets/) that provides succinct, interactive visualizations to aid in understanding and analyzing machine learning datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.visualize_statistics(train_stats)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The interactive widget you see is **Facets Overview**. \n",
    "- Numeric features and categorical features are visualized separately, including charts showing the distributions for each feature.\n",
    "- Features with missing or zero values display a percentage in red as a visual indicator that there may be issues with examples in those features. The percentage is the percentage of examples that have missing or zero values for that feature.\n",
    "- Try clicking \"expand\" above the charts to change the display\n",
    "- Try hovering over bars in the charts to display bucket ranges and counts\n",
    "- Try switching between the log and linear scales\n",
    "- Try selecting \"quantiles\" from the \"Chart to show\" menu, and hover over the markers to show the quantile percentages"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Infering Schema\n",
    "Now let's use `tfdv.infer_schema` to create a schema for the data. A schema defines constraints for the data that are relevant for ML. Example constraints include the data type of each feature, whether it's numerical or categorical, or the frequency of its presence in the data. For categorical features the schema also defines the domain - the list of acceptable values. Since writing a schema can be a tedious task, especially for datasets with lots of features, TFDV provides a method to generate an initial version of the schema based on the descriptive statistics."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Infer the schema from the training dataset statistics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = tfdv.infer_schema(train_stats)\n",
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In general, TFDV uses conservative heuristics to infer stable data properties from the statistics in order to avoid overfitting the schema to the specific dataset. It is strongly advised to review the inferred schema and refine it as needed, to capture any domain knowledge about the data that TFDV's heuristics might have missed.\n",
    "\n",
    "In our case `tfdv.infer_schema` did not interpreted the `Soil_Type` and `Cover_Type` fields properly. Although both fields are encoded as integers, they should be interpreted as categorical rather than numeric. \n",
    "\n",
    "You can use TFDV to manually update the schema including, specifing which features are categorical and which ones are quantitative and defining feature domains.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fine tune the schema\n",
    "\n",
    "You are going to modify the schema:\n",
    "- Particularize the `Soil_Type` and `Cover_Type` as categorical features. Notice that at this point you don't set the domain of `Soil_Type` as enumerating all possible values is not possible without a full scan of the dataset.  After you re-generate the statistics using the correct feature designations you can retrieve the domain from the new statistics and finalize the schema\n",
    "- Set contstraints on the values of the `Slope` feature. You know that the slope is measured in degrees of arc and should be in the 0-90 range."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.get_feature(schema, 'Soil_Type').type = schema_pb2.FeatureType.BYTES\n",
    "tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=[]))\n",
    "\n",
    "tfdv.set_domain(schema, 'Cover_Type', schema_pb2.IntDomain(name='Cover_Type', min=1, max=7, is_categorical=True))\n",
    "\n",
    "tfdv.get_feature(schema, 'Slope').type = schema_pb2.FeatureType.FLOAT\n",
    "tfdv.set_domain(schema, 'Slope',  schema_pb2.FloatDomain(name='Slope', min=0, max=90))\n",
    "\n",
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate new statistics using the updated schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)\n",
    "\n",
    "train_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=TRAINING_DATASET_WITH_MISSING_VALUES,\n",
    "    stats_options=stats_options,\n",
    ")\n",
    "\n",
    "tfdv.visualize_statistics(train_stats)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Finalize the schema\n",
    "The `train_stats` object is a instance of the `statistics_pb2` class, which is a Python wrapper around the [`statistics.proto` protbuf](https://github.com/tensorflow/metadata/blob/master/tensorflow_metadata/proto/v0/statistics.proto). You can use [the protobuf Python interface](https://developers.google.com/protocol-buffers/docs/reference/python-generated) to retrieve the generated statistics, including the infered domains of categorical features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "soil_type_stats = [feature for feature in train_stats.datasets[0].features if feature.path.step[0]=='Soil_Type'][0].string_stats\n",
    "soil_type_domain = [bucket.label for bucket in soil_type_stats.rank_histogram.buckets]\n",
    "\n",
    "tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=soil_type_domain))\n",
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating statistics using Cloud Dataflow\n",
    "\n",
    "Previously, you created descriptive statistics using local compute. This may work for smaller datasets. But for large datasets you may not have enough local compute power. The `tfdv.generate_statistics_*` functions can utilize `DataflowRunner` to run Beam processing on a distributed Dataflow cluster.\n",
    "\n",
    "To run TFDV on Google Cloud Dataflow, the TFDV library must be must be installed on the Dataflow workers. There are different ways to install additional packages on Dataflow. You are going to use the Python `setup.py` file approach.\n",
    "\n",
    "You also configure `tfdv.generate_statistics_from_csv` to use the final schema created in the previous steps."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Configure Dataflow settings\n",
    "Create the `setup.py` configured to install TFDV."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile setup.py\n",
    "\n",
    "from setuptools import setup\n",
    "\n",
    "setup(\n",
    "    name='tfdv',\n",
    "    description='TFDV Runtime.',\n",
    "    version='0.1',\n",
    "    install_requires=[\n",
    "      'tensorflow_data_validation==0.15.0'\n",
    "    ]\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Regenerate statistics\n",
    "Re-generate the statistics using Dataflow and the final schema. You can monitor the job progress using [Dataflow UI](https://console.cloud.google.com/dataflow)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "options = PipelineOptions()\n",
    "options.view_as(GoogleCloudOptions).project = PROJECT_ID\n",
    "options.view_as(GoogleCloudOptions).region = REGION\n",
    "options.view_as(GoogleCloudOptions).job_name = \"tfdv-{}\".format(time.strftime(\"%Y%m%d-%H%M%S\"))\n",
    "options.view_as(GoogleCloudOptions).staging_location = STAGING_BUCKET + '/staging/'\n",
    "options.view_as(GoogleCloudOptions).temp_location = STAGING_BUCKET + '/tmp/'\n",
    "options.view_as(StandardOptions).runner = 'DataflowRunner'\n",
    "options.view_as(SetupOptions).setup_file = os.path.join(LAB_ROOT_FOLDER, 'setup.py')\n",
    "\n",
    "stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)\n",
    "\n",
    "train_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=TRAINING_DATASET_WITH_MISSING_VALUES,\n",
    "    stats_options=stats_options,\n",
    "    pipeline_options=options,\n",
    "    output_path=STAGING_BUCKET + '/output/'\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.visualize_statistics(train_stats)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Analyzing evaluation data\n",
    "\n",
    "So far we've only been looking at the training data. It's important that our evaluation data is consistent with our training data, including that it uses the same schema. It's also important that the evaluation data includes examples of roughly the same ranges of values for our numerical features as our training data, so that our coverage of the loss surface during evaluation is roughly the same as during training. The same is true for categorical features. Otherwise, we may have training issues that are not identified during evaluation, because we didn't evaluate part of our loss surface.\n",
    "\n",
    "You will now generate statistics for the evaluation split and visualize both training and evaluation splits on the same chart:\n",
    "\n",
    "- The training and evaluation datasets overlay, making it easy to compare them.\n",
    "- The charts now include a percentages view, which can be combined with log or the default linear scales.\n",
    "- Click expand on the Numeric Features chart, and select the log scale. Review the `Slope` feature, and notice the difference in the max. Will that cause problems?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)\n",
    "\n",
    "eval_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=EVALUATION_DATASET_WITH_ANOMALIES,\n",
    "    stats_options=stats_options\n",
    ")\n",
    "\n",
    "tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,\n",
    "                         lhs_name='EVAL DATASET', rhs_name='TRAIN_DATASET')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Checking for anomalies\n",
    "\n",
    "Does our evaluation dataset match the schema from our training dataset? This is especially important for categorical features, where we want to identify the range of acceptable values.\n",
    "\n",
    "What would happen if we tried to evaluate using data with categorical feature values that were not in our training dataset? What about numeric features that are outside the ranges in our training dataset?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "anomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)\n",
    "tfdv.display_anomalies(anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fixing evaluation anomalies in the schema\n",
    "\n",
    "It looks like we have some new values for `Soil_Type` and some out-of-range values for `Slope` in our evaluation data, that we didn't have in our training data. Whever it should be considered anomaly, depends on our domain knowledge of the data. If an anomaly truly indicates a data error, then the underlying data should be fixed. Otherwise, we can simply update the schema to include the values in the eval dataset.\n",
    "\n",
    "In our case, you are going to add the 5151 value to the domain of `Soil_Type` as 5151 is a valid USFS Ecological Landtype Units code representing the unspecified soil type. The out-of-range values in `Slope` are data errors and should be fixed at the source."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.get_domain(schema, 'Soil_Type').value.append('5151')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Re-validate with the updated schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "updated_anomalies = tfdv.validate_statistics(eval_stats, schema)\n",
    "tfdv.display_anomalies(updated_anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The unexpected string values error in `Soil_Type` is gone but the out-of-range error in `Slope` is still there. Let's pretend you have fixed the source and re-evaluate the evaluation split without corrupted `Slope`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)\n",
    "\n",
    "eval_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=EVALUATION_DATASET,\n",
    "    stats_options=stats_options\n",
    ")\n",
    "updated_anomalies = tfdv.validate_statistics(eval_stats, schema)\n",
    "tfdv.display_anomalies(updated_anomalies)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Schema environments\n",
    "\n",
    "In supervised learning we need to include labels in our dataset, but when we serve the model for inference the labels will not be included. In cases like that introducing slight schema variations is necessary.\n",
    "\n",
    "For example, in this dataset the `Cover_Type` feature is included as the label for training, but it's missing in the serving data. If you validate the serving data statistics against the current schema you get an anomaly\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)\n",
    "\n",
    "eval_stats = tfdv.generate_statistics_from_csv(\n",
    "    data_location=SERVING_DATASET,\n",
    "    stats_options=stats_options\n",
    ")\n",
    "serving_anomalies = tfdv.validate_statistics(eval_stats, schema)\n",
    "tfdv.display_anomalies(serving_anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Environments** can be used to address such scenarios. In particular, specific features in schema can be associated with specific environments.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema.default_environment.append('TRAINING')\n",
    "schema.default_environment.append('SERVING')\n",
    "tfdv.get_feature(schema, 'Cover_Type').not_in_environment.append('SERVING')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you validate the serving statistics against the serving environment in schema you will not get anomaly"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "serving_anomalies = tfdv.validate_statistics(eval_stats, schema, environment='SERVING')\n",
    "tfdv.display_anomalies(serving_anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Freezing the schema\n",
    "\n",
    "When the schema is finalized it can be saved as a textfile and managed under source control like any other code artifact."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_dir = os.path.join(tempfile.mkdtemp(),'covertype_schema')\n",
    "\n",
    "tf.io.gfile.makedirs(output_dir)\n",
    "schema_file = os.path.join(output_dir, 'schema.pbtxt')\n",
    "tfdv.write_schema_text(schema, schema_file)\n",
    "\n",
    "!cat {schema_file}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
